package top.xiesen.stream.source;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

/**
 * @Description 把 Collection 集合作为数据源
 * @className top.xiesen.stream.source.StreamingFromCollection
 * @Author 谢森
 * @Email xiesen310@163.com
 * @Date 2020/2/9 20:02
 */
public class StreamingFromCollection {
    public static void main(String[] args) throws Exception {
        // 获取 flink 运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Integer> data = new ArrayList<>();
        data.add(10);
        data.add(15);
        data.add(20);

        // 指定数据源
        DataStreamSource<Integer> collectionData = env.fromCollection(data);

        // 通过 map 对数据进行处理
        DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer value) throws Exception {
                return value + 1;
            }
        });

        // 直接打印输出
        num.print().setParallelism(1);
        env.execute("StreamingFromCollection");

    }
}
